/*******************************************************************************
 *     ___                  _   ____  ____
 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
 *   | | | | | | |/ _ \/ __| __| | | |  _ \
 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
 *    \__\_\\__,_|\___||___/\__|____/|____/
 *
 *  Copyright (c) 2014-2019 Appsicle
 *  Copyright (c) 2019-2023 QuestDB
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

package io.questdb.griffin.engine.groupby;

import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.PlanSink;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.GroupByFunction;
import io.questdb.griffin.engine.functions.SymbolFunction;
import io.questdb.griffin.engine.functions.TimestampFunction;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractNoRecordSampleByCursor extends AbstractSampleByCursor {
    protected final ObjList<GroupByFunction> groupByFunctions;
    protected final GroupByFunctionsUpdater groupByFunctionsUpdater;
    protected final int timestampIndex;
    private final ObjList<Function> recordFunctions;
    protected RecordCursor baseCursor;
    protected Record baseRecord;
    protected SqlExecutionCircuitBreaker circuitBreaker;
    // this epoch is generally the same as `sampleLocalEpoch` except for cases where
    // sampler passed thru Daytime Savings Transition date
    // diverging values tell `filling` implementations not to fill this gap
    protected long nextSampleLocalEpoch;
    protected long sampleLocalEpoch;
    protected long topTzOffset;
    private boolean areTimestampsInitialized;
    private boolean isNotKeyedLoopInitialized;
    private long topLocalEpoch;
    private long topNextDst;

    public AbstractNoRecordSampleByCursor(
            ObjList<Function> recordFunctions,
            int timestampIndex, // index of timestamp column in base cursor
            TimestampSampler timestampSampler,
            ObjList<GroupByFunction> groupByFunctions,
            GroupByFunctionsUpdater groupByFunctionsUpdater,
            Function timezoneNameFunc,
            int timezoneNameFuncPos,
            Function offsetFunc,
            int offsetFuncPos
    ) {
        super(timestampSampler, timezoneNameFunc, timezoneNameFuncPos, offsetFunc, offsetFuncPos);
        this.timestampIndex = timestampIndex;
        this.recordFunctions = recordFunctions;
        this.groupByFunctions = groupByFunctions;
        this.groupByFunctionsUpdater = groupByFunctionsUpdater;
    }

    @Override
    public void close() {
        Misc.free(baseCursor);
        Misc.clearObjList(groupByFunctions);
        circuitBreaker = null;
    }

    @Override
    public SymbolTable getSymbolTable(int columnIndex) {
        return (SymbolTable) recordFunctions.getQuick(columnIndex);
    }

    @Override
    public SymbolTable newSymbolTable(int columnIndex) {
        return ((SymbolFunction) recordFunctions.getQuick(columnIndex)).newSymbolTable();
    }

    public void of(RecordCursor baseCursor, SqlExecutionContext executionContext) throws SqlException {
        prevDst = Long.MIN_VALUE;
        parseParams(baseCursor, executionContext);
        topNextDst = nextDstUtc;
        this.baseCursor = baseCursor;
        baseRecord = baseCursor.getRecord();
        circuitBreaker = executionContext.getCircuitBreaker();
        isNotKeyedLoopInitialized = false;
        areTimestampsInitialized = false;
    }

    @Override
    public long size() {
        return -1;
    }

    @Override
    public void toTop() {
        GroupByUtils.toTop(recordFunctions);
        baseCursor.toTop();
        localEpoch = topLocalEpoch;
        sampleLocalEpoch = nextSampleLocalEpoch = topLocalEpoch;
        // timezone offset is liable to change when we pass over DST edges
        tzOffset = topTzOffset;
        prevDst = Long.MIN_VALUE;
        nextDstUtc = topNextDst;
        baseRecord = baseCursor.getRecord();
        isNotKeyedLoopInitialized = false;
        areTimestampsInitialized = false;
    }

    private void kludge(long newTzOffset) {
        // time moved forward, we need to make sure we move our sample boundary
        sampleLocalEpoch += (newTzOffset - tzOffset);
        nextSampleLocalEpoch = sampleLocalEpoch;
        tzOffset = newTzOffset;
    }

    protected long adjustDst(long timestamp, @Nullable MapValue mapValue, long nextSampleTimestamp) {
        final long utcTimestamp = timestamp - tzOffset;
        if (utcTimestamp < nextDstUtc) {
            return timestamp;
        }
        final long newTzOffset = rules.getOffset(utcTimestamp);
        prevDst = nextDstUtc;
        nextDstUtc = rules.getNextDST(utcTimestamp);
        // check if DST takes this timestamp back "before" the nextSampleTimestamp
        if (timestamp - (tzOffset - newTzOffset) < nextSampleTimestamp) {
            // time moved backwards, we need to check if we should be collapsing this
            // hour into previous period or not
            updateValueWhenClockMovesBack(mapValue);
            nextSampleLocalEpoch = timestampSampler.round(timestamp);
            localEpoch = nextSampleLocalEpoch;
            sampleLocalEpoch += (newTzOffset - tzOffset);
            tzOffset = newTzOffset;
            return Long.MIN_VALUE;
        }
        kludge(newTzOffset);

        // time moved forward, we need to make sure we move our sample boundary
        return utcTimestamp + newTzOffset;
    }

    protected void adjustDstInFlight(long utcEpoch) {
        if (utcEpoch < nextDstUtc) {
            return;
        }
        final long daylightSavings = rules.getOffset(utcEpoch);
        prevDst = nextDstUtc;
        nextDstUtc = rules.getNextDST(utcEpoch);
        kludge(daylightSavings);
    }

    protected long getBaseRecordTimestamp() {
        return baseRecord.getTimestamp(timestampIndex) + tzOffset;
    }

    protected void initTimestamps() {
        if (areTimestampsInitialized) {
            return;
        }

        if (!baseCursor.hasNext()) {
            baseRecord = null;
            return;
        }

        final long timestamp = baseRecord.getTimestamp(timestampIndex);
        if (rules != null) {
            tzOffset = rules.getOffset(timestamp);
            nextDstUtc = rules.getNextDST(timestamp);
        }

        if (tzOffset == 0 && fixedOffset == Long.MIN_VALUE) {
            // this is the default path, we align time intervals to the first observation
            timestampSampler.setStart(timestamp);
        } else {
            timestampSampler.setStart(fixedOffset != Long.MIN_VALUE ? fixedOffset : 0L);
        }
        topTzOffset = tzOffset;
        topNextDst = nextDstUtc;
        topLocalEpoch = localEpoch = timestampSampler.round(timestamp + tzOffset);
        sampleLocalEpoch = nextSampleLocalEpoch = localEpoch;
        areTimestampsInitialized = true;
    }

    protected void nextSamplePeriod(long timestamp) {
        localEpoch = timestampSampler.round(timestamp);
        // Sometimes rounding, especially around Days can throw localEpoch
        // to the "before" previous DST. When this happens we need to compensate for
        // tzOffset subtraction at the time of delivery of the timestamp to client
        if (localEpoch - tzOffset < prevDst) {
            localEpoch += tzOffset;
        }
        GroupByUtils.toTop(groupByFunctions);
    }

    protected boolean notKeyedLoop(MapValue mapValue) {
        if (!isNotKeyedLoopInitialized) {
            sampleLocalEpoch = localEpoch;
            nextSampleLocalEpoch = localEpoch;
            // looks like we need to populate key map
            // at the start of this loop 'lastTimestamp' will be set to timestamp
            // of first record in base cursor
            groupByFunctionsUpdater.updateNew(mapValue, baseRecord);
            isNotKeyedLoopInitialized = true;
        }

        long next = timestampSampler.nextTimestamp(localEpoch);
        while (baseCursor.hasNext()) {
            long timestamp = getBaseRecordTimestamp();
            if (timestamp < next) {
                circuitBreaker.statefulThrowExceptionIfTripped();

                adjustDstInFlight(timestamp - tzOffset);
                groupByFunctionsUpdater.updateExisting(mapValue, baseRecord);
            } else {
                // timestamp changed, make sure we keep the value of 'lastTimestamp'
                // unchanged. Timestamp columns uses this variable
                // When map is exhausted we would assign 'next' to 'lastTimestamp'
                // and build another map
                timestamp = adjustDst(timestamp, mapValue, next);
                if (timestamp != Long.MIN_VALUE) {
                    nextSamplePeriod(timestamp);
                    isNotKeyedLoopInitialized = false;
                    return true;
                }
            }
        }
        // opportunity, after we stream map that's it
        baseRecord = null;
        isNotKeyedLoopInitialized = false;
        return true;
    }

    protected void updateValueWhenClockMovesBack(MapValue value) {
        groupByFunctionsUpdater.updateExisting(value, baseRecord);
    }

    protected class TimestampFunc extends TimestampFunction implements Function {

        @Override
        public long getTimestamp(Record rec) {
            return sampleLocalEpoch - tzOffset;
        }

        @Override
        public boolean isReadThreadSafe() {
            return false;
        }

        @Override
        public void toPlan(PlanSink sink) {
            sink.val("Timestamp");
        }
    }
}
